JWT 的介绍


注意: JWT 不局限于在 Django 下使用,也可以在 Flask 等其他框架 或 其他语言下使用

1. JWT 的简介

  • jwt(JSON Web Tokens),是一种开发的行业标准 RFC 7519 ,用于安全的表示双方之间的声明

  • 目前,jwt广泛应用在系统的用户认证方面,特别是现在前后端分离项目

2. JWT 认证流程


在项目开发中,一般会按照上图所示的过程进行认证,即:用户登录成功之后,服务端给用户浏览器返回一个token,以后用户浏览器要携带token再去向服务端发送请求,服务端校验token的合法性,合法则给用户看数据,否则,返回一些错误信息


3. 传统 token方式 和 JWT 在认证方面有什么差异?

  • 传统token方式

    • 用户登录成功后,服务端生成一个随机 token 给用户,并且在服务端数据库或缓存)中保存一份token,以后用户再来访问时需携带 token,服务端接收到 token 之后,去 数据库 或 缓存 中进行校验 token 的是否超时、是否合法

  • jwt方式

    • 用户登录成功后,服务端通过 jwt 生成一个随机 token 给用户(服务端无需保留token),以后用户再来访问时需携带 token,服务端接收到 token 之后,通过 jwt 对 token 进行校验是否超时、是否合法

JWT 的原理


1. 用户提交用户名和密码给客户端进行验证,如果登陆成功,使用 JWT 创建一个 token,并且返回给用户

  • jwt 生成的 token 是由三段字符串组成的,并且用 “.” 连接起来

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

  • 第一段字符串: HEADER 部分 

    • 内容包括: 算法类型 和 token 类型

{
  "alg": "HS256",  # 算法类型
  "typ": "JWT"  # token 类型
}

    • 然后,对 HEADER 部分进行 base64url 加密,然后得到第一部分的字符串(密文)

      • base64url 加密是先做 base64加密,然后再将 - 替代 + 及 _ 替代 / 。

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9

  • 第二段字符串: PAYLOAD 部分

    • 一般存储着自定义的内容(即: 想返回给前端的内容)

{
  "id": "1",
  "name": "Kevin",
  "exp": 1516239022  # token 的超时时间,一般都要有该属性
  ...
}

    • 然后,对 PAYLOAD 部分进行 base64url 加密,然后得到第二部分的字符串(密文)

      • base64url 加密是先做 base64加密,然后再将 - 替代 + 及 _ 替代 / 。

eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ

  • 第三段字符串: SIGNATURE 部分

    • 第一步: 将 第一段密文 和 第二段密文 用 “.” 拼接起来

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ

    • 第二步: 对第一步拼接后得到的字符串进行 HS256 加密 + 加盐

    • 第三步: 对 HS256 加密后的密文再做 base64url 加密,然后得到第三部分的字符串(密文)

SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

    • jwt 的源码表现

base64url(
    HMACSHA256(
      base64UrlEncode(header) + "." + base64UrlEncode(payload),
      your-256-bit-secret (秘钥加盐)
    )
)

2. 以后用户访问接口的时候都需要携带 token 进行访问,然后后端需要对 token 进行校验

  • 校验 token 的第一步: 获取 token

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

  • 校验 token 的第二步: 以 “.” 作为分隔符对 token 进行分割

第一部分: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9

第二部分: eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ

第三部分: SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

  • 校验 token 的第三步: 对 token 的第二部分进行 base64url 解密,并获取到 PAYLOAD 信息(即: 自定义的内容信息,返回给前端的内容),然后检测 token 是否过期

{
  "id": "1",
  "name": "Kevin",
  "exp": 1516239022  # token 的超时时间,一般都要有该属性
  ...
}

  • 校验 token 的第四步: 

    • 第一步: 将 token 的第一部分字符串 和 token 的第一部分字符串 用 “.” 拼接起来

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ

    • 第二步: 对第一步拼接后得到的字符串进行 HS256 加密 + 加盐,得到 密文一

    • 第三步: 对 token 的第三部分进行 base64url 解密,得到 密文二

    • 第四步: 判断 密文一 和 密文二 是否相等,如果相等,表示 token 未被修改过(认证通过)

JWT 的使用


1. JWT 的安装

pip3 install pyjwt -i https://pypi.douban.com/simple # 使用豆瓣的镜像

2. JWT 的使用(未对代码进行封装处理)

# views.py

from rest_framework.views import APIView
from rest_framework.response import Response
from app01 import models
from django.conf import settings

import jwt
from jwt import exceptions
import datetime


class LoginView(APIView):
    def post(self, request):
        username = request.data.get('username')
        password = request.data.get('password')

        user_obj = models.UserInfo.objects.filter(username=username, password=password).first()

        if not user_obj:
            return Response({'code': 1000, 'error': '用户名或密码错误'})

# -------------------- 使用 jwt 构造 token -----------------------

# 盐
        SALT = settings.SECRET_KEY

# 构造 header
        headers = {
            'typ': 'jwt',  # token 类型
            'alg': 'HS256'  # 算法类型
        }

# 构造 payload
        payload = {
            'user_id': user_obj.pk,  # 自定义用户id
            'username': user_obj.username,  # 自定义用户名
            'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=1)  # token 超时时间
        }

        token = jwt.encode(
            headers=headers,  # 头部信息
            payload=payload,  # 自定义内容(即: 返回给前端的内容)
            key=SALT,  # 盐,使用 django 配置文件中的 SECRET_KEY 作为盐
            algorithm="HS256",  # 加密方式
        ).decode('utf-8')

        return Response({'code': 1001, 'token': token})


class OrderView(APIView):
    def get(self, request):

# -------------------- 使用 jwt 验证 token -----------------------

# 从 url 参数中获取 token
        token = request.query_params.get('token')
# 从 请求头 中获取 token
        # token = request.META.get('HTTP_AUTHORIZATION')

# 盐
        SALT = settings.SECRET_KEY

        payload = None
        msg = ''
        try:
 # 验证 token 是否正确,如果 token 校验成功会返回 payload 中的内容(即: 自定义的内容)
            payload = jwt.decode(token, SALT, True)  # True 表示进行 token 的校验
        except exceptions.ExpiredSignatureError:
            msg = 'token已失效(已过期)'
        except jwt.DecodeError:
            msg = 'token认证失败'
        except jwt.InvalidTokenError:
            msg = '非法的token'

        if not payload:
            return Response({'code': 1002, 'error': msg})

        return Response({'code': 1003, 'data': payload})



3. JWT 的使用(对代码进行了封装处理)

  • 获取 token

# utils/auth_token.py

import jwt
import datetime
from django.conf import settings


def get_token(payload, timeout):
"""
    :param payload: 自定义的内容(即: 返回给前端的内容)
    :param timeout: 超时时间
    :return: token
    """

 # 盐
    SALT = settings.SECRET_KEY

# 构造 header
    headers = {
        'typ': 'jwt',  # token 类型
        'alg': 'HS256'  # 算法类型
    }

# 将超时时间添加到 payload 中
    payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(minutes=timeout)  # token 超时时间

    token = jwt.encode(
        headers=headers,  # 头部信息
        payload=payload,  # 自定义内容(即: 返回给前端的内容)
        key=SALT,  # 盐,使用 django 配置文件中的 SECRET_KEY 作为盐
        algorithm="HS256",  # 加密方式
    ).decode('utf-8')

    return token

  • token 的校验

# extensions/auth.py

from rest_framework.authentication import BaseAuthentication
from rest_framework import exceptions as rf_exceptions

import jwt
from jwt import exceptions as jwt_exceptions
from django.conf import settings


class TokenAuth(BaseAuthentication):
    def authenticate(self, request):

 # 从 url 参数中获取 token
        token = request.query_params.get('token')
# 从 请求头 中获取 token
token = request.META.get('HTTP_AUTHORIZATION')

# 盐
        SALT = settings.SECRET_KEY

        try:
# 验证 token 是否正确,如果 token 校验成功会返回 payload 中的内容(即: 自定义的内容)
            payload = jwt.decode(token, SALT, True)  # True 表示进行 token 的校验

return payload, token

        except jwt_exceptions.ExpiredSignatureError:
            raise rf_exceptions.AuthenticationFailed({'code': 1003, 'msg': 'token已失效(已过期)'})

        except jwt.DecodeError:
            raise rf_exceptions.AuthenticationFailed({'code': 1004, 'msg': 'token认证失败'})

        except jwt.InvalidTokenError:
            raise rf_exceptions.AuthenticationFailed({'code': 1005, 'msg': '非法的token'})

  • 视图

# views.py

from rest_framework.views import APIView
from rest_framework.response import Response
from app01 import models

from app01.utils.auth_token import get_token
from app01.extensions.auth import TokenAuth


class LoginView(APIView):
    def post(self, request):
        username = request.data.get('username')
        password = request.data.get('password')

        user_obj = models.UserInfo.objects.filter(username=username, password=password).first()

        if not user_obj:
            return Response({'code': 1000, 'error': '用户名或密码错误'})

# 获取 token
        token = get_token({
            'user_id': user_obj.pk,  # 自定义用户id
            'username': user_obj.username,  # 自定义用户名
        }, 1)

        return Response({'code': 1001, 'token': token})


class OrderView(APIView):
    authentication_classes = [TokenAuth]

    def get(self, request):
        return Response({
            'code': 1006,
            'data': {
                'user_id': request.user['user_id'],
                'username': request.user['username']
            }
        })



JWT 刷新 Token 的思路与实现


auto_jwt_demo.rar

1. 思路

  • 准备两个 token

    • token(过期时间短) -> 用于平常的验证过期

    • refresh_token(过期时间长) -> refresh_token 用于当 token 过期时,换取新的 token 值,以及一个新的 refresh_token 值

  • 当客户端在发送请求前,需要检测 token 是否过期

    • 情况一: 当 token 没有过期,继续发送请求

    • 情况二: 当 token 已过期,发送刷新 token 的请求,根据返回的状态码进行判断是否继续请求或重新登陆

      • ① 如果后端检测 refresh_token 正确且没有过期,返回正确的状态码,然后前端更新 Vuex 中的 token 继续发送请求

      • ② 如果后端检测 refresh_token 错误或者过期,返回错确的状态码,然后前端需要重新登陆获取 token

  • 当服务端接收到 token 时

    • 情况一: 当 token 正确且没有过期,返回对应的数据信息

    • 情况二: 当 token 错误或者过期,返回错误的数据信息

  • 当服务端刷新 token 是

    • 情况一: 当 refresh_token 正确且没有过期,返回新的 token、refresh_token、token 过期时间 

    • 情况二: 当 refresh_token 错误或者过期,返回错误的数据信息

2. 后端实现

  • 获取 token 和 token 过期时间

# utils/auth_token.py

import jwt
import datetime
from django.conf import settings


class MyToken(object):
    def __init__(self):
        self.typ = 'jwt'  # token 类型
        self.alg = "HS256"  # 算法类型
        self.SALT = settings.SECRET_KEY  # 盐,使用 django 配置文件中的 SECRET_KEY 作为盐

 # 获取 token 超时时间
    def get_token_time(self, time_type, exp, is_utc=True):
        if is_utc:
            not_time = datetime.datetime.utcnow()
        else:
            not_time = datetime.datetime.now()
        if time_type == 'days':
            return not_time + datetime.timedelta(days=exp)
        elif time_type == 'hours':
            return not_time + datetime.timedelta(hours=exp)
        elif time_type == 'minutes':
            return not_time + datetime.timedelta(minutes=exp)
        elif time_type == 'seconds':
            return not_time + datetime.timedelta(seconds=exp)

# 获取 token 信息(包括: token、token 过期时间)
    def get_token_info(self, payload, time_type, exp):
# 构造 header
        headers = {
            'typ': self.typ,  # token 类型
            'alg': self.alg  # 算法类型
        }

# 将超时时间添加到 payload 中
        payload['exp'] = self.get_token_time(time_type, exp)  # token 超时时间

# 获取不是 utc 的 token 超时时间
        token_time = self.get_token_time(time_type, exp, is_utc=False)

        token = jwt.encode(
            headers=headers,  # 头部信息
            payload=payload,  # 自定义内容(即: 返回给前端的内容)
            key=self.SALT,  # 盐,使用 django 配置文件中的 SECRET_KEY 作为盐
            algorithm=self.alg,  # 加密方式
        ).decode('utf-8')

        return {
            'token': token,  # token
            'token_time': token_time  # token 过期时间
        }

  • token 的校验

# extensions/auth.py

from rest_framework.authentication import BaseAuthentication
from rest_framework import exceptions as rf_exceptions

import jwt
from jwt import exceptions as jwt_exceptions
from django.conf import settings


class TokenAuth(BaseAuthentication):
    def authenticate(self, request):

 # 从 url 参数中获取 token
        token = request.query_params.get('token')
# 从 请求头 中获取 token
token = request.META.get('HTTP_AUTHORIZATION')

# 盐
        SALT = settings.SECRET_KEY

        try:
# 验证 token 是否正确,如果 token 校验成功会返回 payload 中的内容(即: 自定义的内容)
            payload = jwt.decode(token, SALT, True)  # True 表示进行 token 的校验

return payload, token

        except jwt_exceptions.ExpiredSignatureError:
            raise rf_exceptions.AuthenticationFailed({'code': 1003, 'msg': 'token已失效(已过期)'})

        except jwt.DecodeError:
            raise rf_exceptions.AuthenticationFailed({'code': 1004, 'msg': 'token认证失败'})

        except jwt.InvalidTokenError:
            raise rf_exceptions.AuthenticationFailed({'code': 1005, 'msg': '非法的token'})

  • 视图

# views.py

from rest_framework.views import APIView
from rest_framework.response import Response
from api import models

from api.utils.auth_token import MyToken
from api.extensions.auth import TokenAuth

import jwt
from jwt import exceptions
from django.conf import settings


# 登陆
class LoginView(APIView):
    def post(self, request):
        username = request.data.get('username')
        password = request.data.get('password')

        user_obj = models.UserInfo.objects.filter(username=username, password=password).first()

        if not user_obj:
            return Response({'code': 1000, 'error': '用户名或密码错误'})

# 获取 token
        my_token = MyToken()

 # 短时间的 token,用于校验 token
        token_info = my_token.get_token_info(
            payload={
                'user_id': user_obj.pk,  # 自定义用户id
                'username': user_obj.username,  # 自定义用户名
            },
            time_type='seconds',
            exp=3
        )

 # 长时间的 token,用于更新 token
        refresh_token_info = my_token.get_token_info(
            payload={
                'user_id': user_obj.pk,  # 自定义用户id
                'username': user_obj.username,  # 自定义用户名
            },
            time_type='seconds',
            exp=5
        )

        return Response({'code': 1001, 'data': {
            'token': token_info['token'],
            'refresh_token': refresh_token_info['token'],
            'token_time': token_info['token_time'],
        '+'}'+'}'}})


# 更新 Token
class UpdateTokenView(APIView):
    def post(self, request):
# 获取请求头中 token
        refresh_token = request.META.get('HTTP_REFRESHAUTHORIZATION')

# 盐
        SALT = settings.SECRET_KEY

        payload = None
        msg = ''

        try:
# 验证 token 是否正确,如果 token 校验成功会返回 payload 中的内容(即: 自定义的内容)
            payload = jwt.decode(refresh_token, SALT, True)  # True 表示进行 token 的校验

# 获取 token
            my_token = MyToken()

 # 短时间的 token,用于校验 token
            token_info = my_token.get_token_info(
                payload={
                    'user_id': payload['user_id'],  # 自定义用户id
                    'username': payload['username'],  # 自定义用户名
                },
                time_type='seconds',
                exp=3
            )

 # 长时间的 token,用于更新 token
            refresh_token_info = my_token.get_token_info(
                payload={
                    'user_id': payload['user_id'],  # 自定义用户id
                    'username': payload['username'],  # 自定义用户名
                },
                time_type='seconds',
                exp=5
            )

            return Response({
                'code': 1007,
                'data': {
                    'token': token_info['token'],
                    'refresh_token': refresh_token_info['token'],
                    'token_time': token_info['token_time'],
                }
            })

        except exceptions.ExpiredSignatureError:
            msg = 'token已失效(已过期)'
        except jwt.DecodeError:
            msg = 'token认证失败'
        except jwt.InvalidTokenError:
            msg = '非法的token'

        if not payload:
            return Response({'code': 1008, 'error': msg})


# 订单
class OrderView(APIView):
    authentication_classes = [TokenAuth]

    def get(self, request):
        return Response({
            'code': 1006,
            'data': '订单数据'
        })

2. 前端实现(Vue)

  • 通过 axios 拦截器进行实现,在每次请求前验证 token 是否过期,如果已过期那么就刷新 token 后再发送请求

  • 注意: 如果出现了跨域问题,一定要用前端方法进行解决,不要使用后端方法进行解决,否则会没有效果

  • 使用前需要修改的地方

    • 拦截器中的白名单

    • 拦截器中的刷新token的请求网址

    • 拦截器中刷新token请求成功后返回的状态码

# src/api/ajax.js

import axios from 'axios'
import store from '../store'
import this_vue from '../main'
import {RECEIVE_TOKEN} from "../store/mutations-types";

// axios.defaults.baseURL = 'http://127.0.0.1:8010/api';
axios.defaults.baseURL = `./api`;

// axios.defaults.headers.post['Content-Type'] = 'application/x-www-form-urlencoded';

export default function (url, data = {}, type = "GET") {
    return new Promise((resolve, reject) => {
        let promise;

// 获取token的相关信息
        const token = store.state.token;
        const refresh_token = store.state.refresh_token;
        const token_time = store.state.token_time;

// axios 取消请求的相关参数
        const CancelToken = axios.CancelToken;
        const source = CancelToken.source();

// 每次请求都携带token
        axios.defaults.headers.common['Authorization'] = token;
        axios.defaults.headers.common['RefreshAuthorization'] = refresh_token;

// axios 请求拦截器
        let myInterceptor = axios.interceptors.request.use(async config => {

// 白名单
            let whitelist = [
                '/login/',
                '/update_token/',
            ];

            if (token && whitelist.indexOf(config.url) == -1) {

                let token_time_obj = new Date(token_time); // token 过期时间的时间戳
                let this_time_obj = new Date(); // 当前时间的时间戳

// 如果当前时间大于token过期时间那么就刷新 token
                if (token_time_obj.getTime() < this_time_obj.getTime()) {
// 请求刷新 token 的接口
                    let response = await axios.post('/update_token/');
                    let res = response.data;
                    if (res.code == '1007') {
                        this_vue.$store.commit(RECEIVE_TOKEN, {
                            token: res.data.token,
                            refresh_token: res.data.refresh_token,
                            token_time: res.data.token_time
                        });
                        config.headers.Authorization = res.data.token;
                        config.headers.RefreshAuthorization = res.data.refresh_token;
                        return config;
                    } else {
// 如果连 refresh_token 都过期,那么就需要重新登陆
                        config.cancelToken = source.token;
                        source.cancel('token已过期,请重新登陆');
                        this_vue.$router.replace({path: 'login'});
                        return config
                    }
                } else {
                    return config;
                }
            } else {
                return config;
            }

        }, error => {
 // 对请求错误做些什么
            return Promise.reject(error);
        });

 // 请求
        if (type === 'GET') {
            let dataStr = '';
            Object.keys(data).forEach(key => {
                dataStr += key + '=' + data[key] + '&';
            });
            if (dataStr !== '') {
                dataStr = dataStr.substring(0, dataStr.lastIndexOf('&'));
                url += '?' + dataStr;
            }
            promise = axios.get(url)
        } else {
            let type_lower = type.toLowerCase();
            promise = axios[type_lower](url, data)
        }

// 移除 axios 拦截器
        axios.interceptors.request.eject(myInterceptor);

        promise.then((response) => {
            resolve(response.data);
        }).catch((error) => {
            resolve(error);
        })
    });
}